import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../../common/test_page.dart';
import '../../utils.dart';

Stream<int> getStream(int n) async* {
  var k = 0;

  while (k < n) {
    Future<void>.delayed(const Duration(milliseconds: 100));

    yield k++;
  }
}

Stream<int> getTimeStream(int n) async* {
  var k = 1;

  yield 0;

  while (k < n) {
    yield Future<void>.delayed(const Duration(milliseconds: 100)).then((_) => k++) as int;
  }
}

class BufferTestPage extends TestPage {
  BufferTestPage(super.title) {
    test('Rx.bufferCount.noStartBufferEvery', () async {
      expect(
          Rx.range(1, 4).bufferCount(2),
      );
    });

    test('Rx.bufferCount.noStartBufferEvery.includesEventOnClose', () async {
      expect(
          Rx.range(1, 5).bufferCount(2),
      );
    });

    test('Rx.bufferCount.startBufferEvery.count2startBufferEvery1', () async {
      expect(
          Rx.range(1, 4).bufferCount(2, 1),
      );
    });

    test('Rx.bufferCount.startBufferEvery.count3startBufferEvery2', () async {
      expect(
          Rx.range(1, 8).bufferCount(3, 2),
      );
    });

    test('Rx.bufferCount.startBufferEvery.count3startBufferEvery4', () async {
      expect(
          Rx.range(1, 8).bufferCount(3, 4),
      );
    });

    test('Rx.bufferCount.reusable', () async {
      final transformer = BufferCountStreamTransformer<int>(2);

      expect(
          Stream.fromIterable(const [1, 2, 3, 4]).transform(transformer),
      );

      expect(
          Stream.fromIterable(const [1, 2, 3, 4]).transform(transformer),
      );
    });

    test('Rx.bufferCount.asBroadcastStream', () async {
      final stream = Stream.fromIterable(const [1, 2, 3, 4])
          .asBroadcastStream()
          .bufferCount(2);

      // listen twice on same stream
      expect(
          stream,
      );

      expect(stream);
    });

    test('Rx.bufferCount.error.shouldThrowA', () async {
      expect(Stream<void>.error(Exception()).bufferCount(2));
    });

    test(
      'Rx.bufferCount.shouldThrow.invalidCount.negative',
          () {
        expect(() => Stream.fromIterable(const [1, 2, 3, 4]).bufferCount(-1));
      },
    );

    test('Rx.bufferCount.startBufferEvery.shouldThrow.invalidStartBufferEvery',
            () {
          expect(() => Stream.fromIterable(const [1, 2, 3, 4]).bufferCount(2, -1),);
        });

    test('Rx.bufferCount.nullable', () {
      nullableTest<List<String?>>(
            (s) => s.bufferCount(1),
      );
    });

    test('Rx.buffer', () async {
      expect(
          getStream(4).buffer(
              Stream<void>.periodic(const Duration(milliseconds: 160)).take(3)),
      );
    });

    test('Rx.buffer.sampleBeforeEvent.shouldEmit', () async {
      expect(
          Stream.fromFuture(
              Future<void>.delayed(const Duration(milliseconds: 200))
                  .then((_) => 'end')).startWith('start').buffer(
              Stream<void>.periodic(const Duration(milliseconds: 40)).take(10)),
      );
    });

    test('Rx.buffer.shouldClose', () async {
      final controller = StreamController<int>()
        ..add(0)
        ..add(1)
        ..add(2)
        ..add(3);

      scheduleMicrotask(controller.close);

      expect(
          controller.stream
              .buffer(Stream<void>.periodic(const Duration(seconds: 3)))
              .take(1),
      );
    });

    test('Rx.buffer.reusable', () async {
      final transformer = BufferStreamTransformer<int>((_) =>
          Stream<void>.periodic(const Duration(milliseconds: 160))
              .take(3)
              .asBroadcastStream());

      expect(
          getStream(4).transform(transformer),
      );

      expect(
          getStream(4).transform(transformer),
      );
    });

    test('Rx.buffer.asBroadcastStream', () async {
      final stream = getStream(4).asBroadcastStream().buffer(
          Stream<void>.periodic(const Duration(milliseconds: 160))
              .take(10)
              .asBroadcastStream());

      // listen twice on same stream
      expect(
          stream,
      );

      expect(stream);
    });

    test('Rx.buffer.error.shouldThrowA', () async {
      expect(
          Stream<void>.error(Exception())
              .buffer(Stream<void>.periodic(const Duration(milliseconds: 160))),
      );
    });

    test('Rx.buffer.nullable', () {
      nullableTest<List<String?>>(
            (s) => s.buffer(Stream<void>.empty()),
      );
    });

    test('Rx.bufferTest', () async {
      expect(
          Rx.range(1, 4).bufferTest((i) => i % 2 == 0),
      );
    });

    test('Rx.bufferTest.reusable', () async {
      final transformer = BufferTestStreamTransformer<int>((i) => i % 2 == 0);

      expect(
          Stream.fromIterable(const [1, 2, 3, 4]).transform(transformer),
      );

      expect(
          Stream.fromIterable(const [1, 2, 3, 4]).transform(transformer),
      );
    });

    test('Rx.bufferTest.asBroadcastStream', () async {
      final stream = Stream.fromIterable(const [1, 2, 3, 4])
          .asBroadcastStream()
          .bufferTest((i) => i % 2 == 0);

      // listen twice on same stream
      expect(
          stream,
      );

      expect(stream);
    });

    test('Rx.bufferTest.error.shouldThrowA', () async {
      expect(
          Stream<int>.error(Exception()).bufferTest((i) => i % 2 == 0));
    });

    test('Rx.bufferTest.nullable', () {
      nullableTest<List<String?>>(
            (s) => s.bufferTest((i) => true),
      );
    });

    test('Rx.bufferTime', () async {
      expect(
          getTimeStream(4).bufferTime(const Duration(milliseconds: 160)),
      );
    });

    test('Rx.bufferTime.shouldClose', () async {
      final controller = StreamController<int>()
        ..add(0)
        ..add(1)
        ..add(2)
        ..add(3);

      scheduleMicrotask(controller.close);

      expect(
          controller.stream.bufferTime(const Duration(seconds: 3)).take(1),
      );
    });

    test('Rx.bufferTime.reusable', () async {
      final transformer = BufferStreamTransformer<int>(
              (_) => Stream<void>.periodic(const Duration(milliseconds: 160)));

      expect(
          getTimeStream(4).transform(transformer),
      );

      expect(
          getTimeStream(4).transform(transformer),
      );
    });

    test('Rx.bufferTime.asBroadcastStream', () async {
      final stream = getTimeStream(4)
          .asBroadcastStream()
          .bufferTime(const Duration(milliseconds: 160));

      // listen twice on same stream
      expect(
          stream,
      );

      expect(stream);
    });

    test('Rx.bufferTime.error.shouldThrowA', () async {
      expect(
          Stream<void>.error(Exception())
              .bufferTime(const Duration(milliseconds: 160)));
    });

    test('Rx.bufferTime.nullable', () {
      nullableTest<List<String?>>(
            (s) => s.bufferTime(Duration.zero),
      );
    });

  }

}